-- SET sql-client.execution.result-mode=TABLEAU;
-- 执行命令： sql-client.sh -f flink_kafka2mysql.q

  log_ts string,
  ts string,
  type string,
  host_id string,
  item_mod string,
  item_val string

CREATE TABLE tbl_hism_host_zzdetail2 (
  content string,
  content2 string
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://hllxd101:53149/db58_zp_bi_dev',
  'username' = 'zpbi_wr',
  'password' = '39086120578ddd85',
  'table-name' = 'tbl_hism_host_zzdetail2'
);

CREATE TABLE hdp_hism_idea_dwd_host_zzdetail (
  `log_ts` string COMMENT '数据接收时间',
  `trace_id` string COMMENT '传输traceId',
  `package_len` bigint COMMENT '数据record接收数据长度',
  `package_len2` bigint COMMENT '数据解析后实际长度',
  `item_json` string COMMENT '数据内容'
) WITH (
  'connector' = 'kafka',
  'topic' = 'host_zzdetail2',
  'properties.bootstrap.servers' = 'node101:9092',
  'scan.startup.mode' = 'earliest-offset',
  'sink.parallelism' = '1',
  'format'='json'
);

INSERT INTO tbl_hism_host_zzdetail2
select ts as content, item_val as content2
from (
  select
    log_ts,
    json_value(item_json, '$.ts') as ts,
    json_value(item_json, '$.type') as resource_type,
    json_value(item_json, '$.host') as host_id,
    json_value(item_json, '$.mod') as item_mod,
    json_value(item_json, '$.value') as item_val
  from hdp_hism_idea_dwd_host_zzdetail
)
where ts is not null;
